feat: chunk protocol, storage handler, and cross-node e2e tests#4
feat: chunk protocol, storage handler, and cross-node e2e tests#4mickvandijke merged 18 commits intomainfrom
Conversation
…ode e2e test Wire up the ANT chunk protocol to the production node and test infrastructure so nodes can store and retrieve chunks over P2P. Production node (src/node.rs): - Initialize AntProtocol (disk storage + payment + quoting) during build - Route incoming P2P chunk messages to the handler, spawning a task per message for concurrent processing - Clean up protocol task on shutdown Protocol & storage modules (new): - ant_protocol: bincode-serialized ChunkMessage enum (Put/Get/Quote) - storage/disk: content-addressed chunk storage with SHA256, sharded dirs, atomic writes, and optional verification on read - storage/handler: AntProtocol handler that ties together DiskStorage, PaymentVerifier, and QuoteGenerator Test infrastructure (tests/e2e/testnet.rs): - Each TestNode runs its own protocol routing loop (mirrors production) - Add store_chunk_on() and get_chunk_from() for cross-node operations E2E test (tests/e2e/data_types/chunk.rs): - test_chunk_store_on_remote_node: node 3 asks node 0 to store a chunk via P2P, retrieves it back, and verifies data integrity Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add store_chunk_on_peer() and get_chunk_from_peer() methods that accept a peer ID string directly, enabling cross-node chunk operations with dynamically discovered peers. Update test_chunk_store_on_remote_node to pick a random connected peer instead of hardcoding node 0, and remove the #[ignore] attribute now that the saorsa-core transport peer ID fix makes P2P message routing work correctly. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Validates the fundamental send_message / subscribe_events layer that higher-level protocols are built on. Sends a message from a regular node to a connected peer and verifies delivery via broadcast event streams. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…definition Move the SHA256 content-address computation from DiskStorage into a standalone public function in client::data_types, eliminating duplicated hashing logic across DataChunk methods, DiskStorage, handler, and tests. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Alphabetical ordering of module declarations and re-exports in lib.rs - Add clippy::panic allow on test modules (chunk.rs, handler.rs) - Fix ignored_unit_patterns: _ = &mut timeout → () = &mut timeout - Move function-level imports to module level (config.rs) - Fix stale protocol_id in doc comments (saorsa/autonomi → saorsa/ant) - Deduplicate XorName type alias: single definition in ant_protocol, imported by storage::disk and re-exported from storage::mod - DRY: ChunkTestFixture::compute_address delegates to DiskStorage - cargo fmt applied across all touched files Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
518491c to
ebc1b20
Compare
There was a problem hiding this comment.
Pull request overview
This PR implements the autonomi protocol layer for chunk storage and retrieval, enabling cross-node communication through bincode-serialized messages. It introduces a content-addressed disk storage system, payment verification infrastructure, and protocol routing that dispatches incoming P2P messages to the appropriate handlers.
Changes:
- Added chunk protocol with PUT/GET/Quote message types and bincode serialization
- Implemented sharded disk storage with atomic writes and content verification
- Created
AntProtocolhandler that validates addresses, verifies payments, and persists chunks - Integrated protocol routing in
RunningNodeto dispatch chunk messages from P2P events - Added cross-node e2e tests validating the full request/response cycle
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/e2e/testnet.rs | Added AntProtocol integration to test nodes and cross-node chunk operations via P2P messaging |
| tests/e2e/live_testnet.rs | Replaced local address computation with centralized compute_address function |
| tests/e2e/integration_tests.rs | Added node-to-node messaging test validating P2P event subscription and message delivery |
| tests/e2e/data_types/chunk.rs | Added cross-node chunk storage test and replaced duplicate address computation logic |
| src/storage/mod.rs | Module definition for storage subsystem with disk and handler components |
| src/storage/handler.rs | AntProtocol handler implementing message routing, payment verification, and storage operations |
| src/storage/disk.rs | Content-addressed disk storage with two-level sharding and atomic write operations |
| src/payment/mod.rs | Exposed CacheStats and EvmVerifierConfig for external use |
| src/node.rs | Integrated AntProtocol creation and protocol message routing background task |
| src/lib.rs | Added public exports for protocol types and storage components |
| src/error.rs | Added Protocol and InvalidChunk error variants |
| src/config.rs | Added StorageConfig for chunk persistence settings |
| src/client/mod.rs | Exported compute_address function |
| src/client/data_types.rs | Centralized SHA256 address computation in public function |
| src/ant_protocol/mod.rs | Module definition for ANT protocol with chunk message types |
| src/ant_protocol/chunk.rs | Wire protocol message definitions with bincode encoding/decoding |
| Cargo.toml | Moved bincode from dev-dependencies to dependencies |
Comments suppressed due to low confidence (2)
tests/e2e/testnet.rs:1
- Corrected article from 'a' to 'an' before 'AntProtocol' (vowel sound).
//! Test network infrastructure for spawning and managing multiple nodes.
src/storage/handler.rs:1
- Inconsistent spacing in ASCII diagram: two spaces before the closing pipe character, while other lines use one space. Should use consistent single space throughout the diagram.
//! ANT protocol handler for autonomi protocol messages.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| let futs: Vec<_> = all_rx | ||
| .iter_mut() | ||
| .enumerate() | ||
| .map(|(i, rx)| Box::pin(async move { (i, rx.recv().await) })) |
There was a problem hiding this comment.
The variable i is enumerated but never used in the mapped closure body. It should be prefixed with an underscore as _i to indicate it's intentionally unused, or replaced with _ if the index is genuinely not needed.
| .map(|(i, rx)| Box::pin(async move { (i, rx.recv().await) })) | |
| .map(|(_, rx)| Box::pin(async move { rx.recv().await })) |
| //! - **Chunk**: Immutable, content-addressed data (hash == address) | ||
| //! - *Scratchpad*: Mutable, owner-indexed data (planned) | ||
| //! - *Pointer*: Lightweight mutable references (planned) | ||
| //! - *`GraphEntry`*: DAG entries with parent links (planned) |
There was a problem hiding this comment.
Mixed formatting inconsistency: GraphEntry uses both asterisks for italics and backticks for code, while Scratchpad and Pointer use only asterisks. Should be consistent, either *GraphEntry* or *GraphEntry* for all planned types.
| //! - *`GraphEntry`*: DAG entries with parent links (planned) | |
| //! - *GraphEntry*: DAG entries with parent links (planned) |
DiskStorageConfig::max_chunks was stored but never checked during put(). Now rejects writes when chunks_stored >= max_chunks (when max_chunks > 0). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
…ault build_ant_protocol used ..Default::default() on EvmVerifierConfig, which always selected ArbitrumOne regardless of config.payment.evm_network. Now maps the config enum to the evmlib::Network variant explicitly. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
QuantumClient::get_chunk/put_chunk bypassed the ANT protocol by using dht_get/dht_put directly. Rewritten to send ChunkGetRequest/ChunkPutRequest messages over P2P via send_message(), with event-based response handling and configurable timeout. exists() now delegates to get_chunk. Also fixes testnet.rs store_chunk_on/get_chunk_from which used peer_id() (app-level "peer_xxx" format) instead of transport_peer_id() (hex-encoded transport ID used by send_message and P2PEvent::Message.source). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Validates the full client -> P2P -> AntProtocol -> DiskStorage path: - put_chunk stores via ChunkPutRequest over P2P - get_chunk retrieves via ChunkGetRequest over P2P - exists returns true for stored, false for missing chunks - get_chunk returns None for non-existent addresses Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 18 out of 18 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// Reference to the running P2P node. | ||
| pub p2p_node: Option<Arc<P2PNode>>, | ||
|
|
||
| /// ANT protocol handler for processing chunk PUT/GET requests. |
There was a problem hiding this comment.
The doc comment references AntProtocol instead of the correct AntProtocol struct name. Update the comment to match the actual type being used.
| /// ANT protocol handler for processing chunk PUT/GET requests. | |
| /// `AntProtocol` handler for processing chunk PUT/GET requests. |
| /// Bootstrap addresses this node connects to. | ||
| pub bootstrap_addrs: Vec<SocketAddr>, | ||
|
|
||
| /// Protocol handler background task. |
There was a problem hiding this comment.
Missing documentation on when this handle should be aborted and why it's optional. Consider adding a comment explaining the lifecycle (e.g., "Set after node start, aborted during teardown").
| /// Protocol handler background task. | |
| /// Protocol handler background task handle. | |
| /// | |
| /// Set when the protocol task is spawned after node start, and | |
| /// aborted or awaited during node teardown. Optional so it is `None` | |
| /// before startup and after shutdown. |
| // Race all receivers concurrently instead of polling sequentially. | ||
| // Pin the deadline sleep once so it tracks cumulative time across loop | ||
| // iterations — otherwise select_all always wins the race against a | ||
| // freshly-created sleep and the timeout never fires. |
There was a problem hiding this comment.
This comment explains a subtle timing behavior but could be clearer about why pinning is necessary. Consider adding "Pinning prevents the timeout from being recreated on each loop iteration, which would reset its deadline."
| // freshly-created sleep and the timeout never fires. | |
| // freshly-created sleep and the timeout never fires. Pinning prevents the | |
| // timeout from being recreated on each loop iteration, which would reset | |
| // its deadline. |
| Ok(false) => {} | ||
| } | ||
|
|
||
| // 4. Verify payment |
There was a problem hiding this comment.
The comment numbering skips from step 2 (line 139) to step 4. Either add the missing step 3 or renumber to maintain sequential ordering.
src/storage/disk.rs
Outdated
| let chunk_path = self.chunk_path(address); | ||
|
|
||
| // Check if already exists (safe under per-address lock) | ||
| let file_exists = fs::try_exists(&chunk_path).await.is_ok_and(|v| v); |
There was a problem hiding this comment.
This expression can be simplified to fs::try_exists(&chunk_path).await.unwrap_or(false) which is clearer and handles errors more explicitly.
| let file_exists = fs::try_exists(&chunk_path).await.is_ok_and(|v| v); | |
| let file_exists = fs::try_exists(&chunk_path).await.unwrap_or(false); |
| /// Maximum number of records for quoting metrics. | ||
| const DEFAULT_MAX_QUOTING_RECORDS: usize = 100_000; | ||
|
|
||
| /// Default rewards address when none is configured (20-byte zero address). |
There was a problem hiding this comment.
Using a zero address as a default rewards address may lead to unintended behavior if payments are enabled. Consider documenting that this is only safe when payment verification is disabled, or use a sentinel value that would fail payment verification explicitly.
| /// Default rewards address when none is configured (20-byte zero address). | |
| /// Default rewards address when none is configured. | |
| /// | |
| /// This uses the 20-byte zero address as a sentinel and MUST ONLY be used | |
| /// when payment verification is disabled. When payments are enabled, a valid | |
| /// non-zero rewards address must be configured explicitly. |
| @@ -108,34 +124,81 @@ impl QuantumClient { | |||
| return Err(Error::Network("P2P node not configured".into())); | |||
| }; | |||
|
|
|||
There was a problem hiding this comment.
The pick_target_peer function randomly selects a peer, but there's no documentation explaining the selection strategy or how this affects data availability. Consider adding a comment explaining why random selection is used instead of content-addressing or closest-peer routing.
| // Select a random connected peer for this request. | |
| // | |
| // The client intentionally delegates content-addressed routing and | |
| // closest-peer selection to the underlying `saorsa_core` network layer, | |
| // which is responsible for delivering the message to whichever node | |
| // actually holds (or can locate) the chunk. Using a random peer here | |
| // keeps this client stateless and helps distribute load evenly across | |
| // the available peers without maintaining a separate routing table. |
| let empty_payment = rmp_serde::to_vec(&ant_evm::ProofOfPayment { | ||
| peer_quotes: vec![], | ||
| }) |
There was a problem hiding this comment.
The empty payment proof creation is duplicated in multiple places (also in store_chunk_on_peer in testnet.rs). Consider extracting this into a helper function like create_empty_payment_proof() to reduce duplication.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 18 out of 18 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
tests/e2e/testnet.rs:1
- Corrected article 'a' to 'an' before 'AntProtocol' (vowel sound).
//! Test network infrastructure for spawning and managing multiple nodes.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// Reference to the running P2P node. | ||
| pub p2p_node: Option<Arc<P2PNode>>, | ||
|
|
||
| /// ANT protocol handler for processing chunk PUT/GET requests. |
There was a problem hiding this comment.
The comment refers to 'ANT protocol handler' but the field is named ant_protocol. For consistency with the p2p_node field above (which stores the P2P node itself), consider clarifying that this field stores the handler instance, not just metadata about it. For example: 'ANT protocol handler instance for processing chunk PUT/GET requests.'
| /// ANT protocol handler for processing chunk PUT/GET requests. | |
| /// ANT protocol handler instance for processing chunk PUT/GET requests. |
| ); | ||
|
|
||
| // Validate data size - data_size is u64, cast carefully | ||
| let data_size_usize = usize::try_from(request.data_size).unwrap_or(usize::MAX); |
There was a problem hiding this comment.
Using unwrap_or(usize::MAX) can cause the size check on line 262 to incorrectly pass when request.data_size > usize::MAX, because usize::MAX itself may be larger than MAX_CHUNK_SIZE. Instead, return an error immediately if the conversion fails: let data_size_usize = usize::try_from(request.data_size).map_err(|_| return ChunkQuoteResponse::Error { ... })?;
| let data_size_usize = usize::try_from(request.data_size).unwrap_or(usize::MAX); | |
| let data_size_usize = match usize::try_from(request.data_size) { | |
| Ok(size) => size, | |
| Err(_) => { | |
| return ChunkQuoteResponse::Error { | |
| request_id: rid, | |
| error: ProtocolError::ChunkTooLarge { | |
| size: usize::MAX, | |
| max_size: MAX_CHUNK_SIZE, | |
| }, | |
| }; | |
| } | |
| }; |
| "Checking existence on saorsa network: {}", | ||
| hex::encode(address) | ||
| ); | ||
| self.get_chunk(address).await.map(|opt| opt.is_some()) |
There was a problem hiding this comment.
The exists() method downloads the full chunk content and then discards it, which is inefficient for large chunks. The comment on line 339-342 acknowledges this, but consider adding a TODO or issue reference to track adding a dedicated ChunkExistsRequest protocol message for efficient existence checks.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 18 out of 18 changed files in this pull request and generated 4 comments.
Comments suppressed due to low confidence (1)
tests/e2e/testnet.rs:1
- Corrected article usage: 'a AntProtocol' should be 'an AntProtocol' since 'ANT' starts with a vowel sound.
//! Test network infrastructure for spawning and managing multiple nodes.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/storage/disk.rs
Outdated
| // Atomic write: temp file + sync + rename. | ||
| // Use a random suffix to avoid collisions if the per-address lock is | ||
| // ever bypassed (e.g., via put_local). | ||
| let temp_path = chunk_path.with_extension(format!("tmp.{:016x}", rand::random::<u64>())); |
There was a problem hiding this comment.
The temporary file collision handling with random suffix (line 172) lacks test coverage. Consider adding a test that simulates concurrent put() calls for the same address to verify that the random suffix prevents collisions and the per-address lock ensures atomic writes.
| async fn pick_target_peer(node: &P2PNode) -> Result<String> { | ||
| let peers = node.connected_peers().await; | ||
| let mut rng = rand::thread_rng(); | ||
| peers | ||
| .into_iter() | ||
| .choose(&mut rng) | ||
| .ok_or_else(|| Error::Network("No connected peers available".into())) | ||
| } |
There was a problem hiding this comment.
The random peer selection logic (lines 364-371) isn't covered by tests. Add unit tests for pick_target_peer that verify it correctly handles empty peer lists and selects from non-empty lists.
| // Pin the deadline sleep once so it tracks cumulative time across loop | ||
| // iterations — otherwise select_all always wins the race against a | ||
| // freshly-created sleep and the timeout never fires. | ||
| let timeout = tokio::time::sleep_until(deadline); |
There was a problem hiding this comment.
The comment on lines 238-240 explains why the timeout is pinned, but this pattern is subtle enough that it warrants an inline code comment at lines 241-242 explaining that pinning prevents the select! from creating a fresh sleep future on each iteration.
| let timeout = tokio::time::sleep_until(deadline); | |
| let timeout = tokio::time::sleep_until(deadline); | |
| // Pin the timeout so the same sleep future is reused in each select! iteration. |
| ); | ||
| RewardsAddress::new(DEFAULT_REWARDS_ADDRESS) | ||
| }; | ||
| let metrics_tracker = QuotingMetricsTracker::new(DEFAULT_MAX_QUOTING_RECORDS, 0); |
There was a problem hiding this comment.
The second argument 0 is unclear—it represents initial_records but passing a literal zero obscures its meaning. Consider using a named constant like DEFAULT_INITIAL_QUOTING_RECORDS for clarity.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 19 out of 19 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/ant_protocol/chunk.rs
Outdated
| fn bincode_options() -> impl Options { | ||
| bincode::options() | ||
| .with_limit(MAX_WIRE_MESSAGE_SIZE) | ||
| .allow_trailing_bytes() |
There was a problem hiding this comment.
Setting allow_trailing_bytes() permits deserialization to succeed even when extra data follows the message. This can mask protocol errors or version mismatches where a peer sends more data than expected. Unless there's a specific reason to allow trailing bytes (e.g., forward compatibility with unknown extensions), consider removing this option to enforce strict message boundaries.
| .allow_trailing_bytes() |
src/storage/disk.rs
Outdated
|
|
||
| // SAFETY: ADDRESS_LOCK_CACHE_CAPACITY is a non-zero constant. | ||
| let lock_capacity = NonZeroUsize::new(ADDRESS_LOCK_CACHE_CAPACITY) | ||
| .unwrap_or(NonZeroUsize::MIN); |
There was a problem hiding this comment.
ADDRESS_LOCK_CACHE_CAPACITY is a non-zero constant (16_384), so NonZeroUsize::new() will never return None. The unwrap_or(NonZeroUsize::MIN) fallback is unreachable. Replace with a direct .expect() or .unwrap() with a comment explaining that the constant is known non-zero.
| .unwrap_or(NonZeroUsize::MIN); | |
| .expect("ADDRESS_LOCK_CACHE_CAPACITY must be non-zero"); |
| ); | ||
|
|
||
| // Validate data size - data_size is u64, cast carefully | ||
| let data_size_usize = usize::try_from(request.data_size).unwrap_or(usize::MAX); |
There was a problem hiding this comment.
Using unwrap_or(usize::MAX) on try_from silently converts out-of-range u64 values to usize::MAX, which then passes the subsequent > MAX_CHUNK_SIZE check, but produces a misleading error message about usize::MAX bytes. Instead, explicitly handle the conversion error and return a clear error about the value being too large for the platform's address space.
| let data_size_usize = usize::try_from(request.data_size).unwrap_or(usize::MAX); | |
| let data_size_usize = match usize::try_from(request.data_size) { | |
| Ok(size) => size, | |
| Err(_) => { | |
| return ChunkQuoteResponse::Error { | |
| request_id: rid, | |
| error: ProtocolError::QuoteFailed(format!( | |
| "Requested data size {} is too large for this platform's address space", | |
| request.data_size | |
| )), | |
| }; | |
| } | |
| }; |
| //! | ||
| //! ## Protocol-Based Testing | ||
| //! | ||
| //! Each test node includes a `AntProtocol` handler that processes chunk |
There was a problem hiding this comment.
Use 'an' instead of 'a' before 'AntProtocol' since it starts with a vowel sound.
| //! Each test node includes a `AntProtocol` handler that processes chunk | |
| //! Each test node includes an `AntProtocol` handler that processes chunk |
| // Subscribe on every node's event stream *before* sending, so we can | ||
| // confirm exactly which node receives the message. | ||
| let all_nodes = harness.all_nodes(); | ||
| let mut all_rx: Vec<_> = all_nodes.iter().map(|n| n.subscribe_events()).collect(); |
There was a problem hiding this comment.
This test subscribes to event streams on all 5 nodes but only needs to confirm delivery to any one node. Subscribing to all nodes wastes memory and CPU on event filtering. Consider subscribing only to the target peer's event stream (if discoverable by peer_id) or a smaller subset of likely recipients.
| ); | ||
|
|
||
| let mut rng = rand::thread_rng(); | ||
| let target_peer_id = peers.choose(&mut rng).expect("peers is non-empty"); |
There was a problem hiding this comment.
The expect message 'peers is non-empty' describes the expectation but doesn't explain what happened. Consider rewording to something like 'failed to choose from non-empty peers list' to clarify that the panic indicates an unexpected state.
| let target_peer_id = peers.choose(&mut rng).expect("peers is non-empty"); | |
| let target_peer_id = peers | |
| .choose(&mut rng) | |
| .expect("failed to choose from non-empty peers list"); |
src/client/quantum.rs
Outdated
| /// **This client is NOT safe for concurrent chunk operations.** Each call to | ||
| /// [`get_chunk`](Self::get_chunk) or [`put_chunk`](Self::put_chunk) subscribes | ||
| /// to the P2P broadcast event stream and filters responses by topic, source, | ||
| /// and `request_id`. If multiple operations run concurrently against the same | ||
| /// peer, one operation may consume another's response from the broadcast | ||
| /// channel, causing the other to time out. Callers must serialize chunk | ||
| /// operations or use separate `QuantumClient` instances per concurrent task. |
There was a problem hiding this comment.
The concurrency limitation described here is a significant footgun for API users. Consider storing a single subscription in the client struct (initialized in new() or lazily) and sharing it across all operations, using a HashMap<request_id, oneshot::Sender> to route responses. This would make concurrent operations safe without requiring users to create multiple client instances.
src/node.rs
Outdated
| "No rewards address configured — using zero address. \ | ||
| Payments will be unrecoverable. Set `payment.rewards_address` in config." |
There was a problem hiding this comment.
The warning message spans three string literals with inconsistent indentation of the continuation. The extra spaces before 'Payments' create awkward whitespace in the output. Use a single string literal or ensure consistent spacing: \"No rewards address configured — using zero address. Payments will be unrecoverable. Set \\payment.rewards_address\ in config.\"
| "No rewards address configured — using zero address. \ | |
| Payments will be unrecoverable. Set `payment.rewards_address` in config." | |
| "No rewards address configured — using zero address. Payments will be unrecoverable. Set `payment.rewards_address` in config." |
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
aab7c35 to
916dc9d
Compare
Wrap ChunkMessage enum (now ChunkMessageBody) in a ChunkMessage struct carrying a random u64 request_id. The handler echoes the ID back so callers match responses by request_id instead of source peer ID. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
A malformed message from any peer on the chunk protocol topic would kill the entire GET/PUT operation. Now decode failures are logged and skipped so only successfully parsed, matching responses affect the caller. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Re-add source == target_peer filtering in chunk protocol event loops. The request_id travels in plaintext, so source validation provides defense-in-depth against spoofed responses from non-target peers. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Use a per-client AtomicU64 counter instead of rand::thread_rng() for chunk protocol request IDs. Sequential IDs guarantee uniqueness, are cheaper than RNG, and produce values that are easier to trace in logs. Moves rand back to dev-dependencies since it is no longer used in production code. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 18 out of 18 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
tests/e2e/testnet.rs:197
- The test imports
rand::Rngbut doesn't directly use it—onlySliceRandom::chooseis needed. Consider importingrand::seq::SliceRandomexplicitly at the top instead of the blanketrand::Rngto make dependencies clearer.
let mut rng = rand::thread_rng();
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ); | ||
|
|
||
| // Validate data size - data_size is u64, cast carefully | ||
| let data_size_usize = usize::try_from(request.data_size).unwrap_or(usize::MAX); |
There was a problem hiding this comment.
Using unwrap_or(usize::MAX) silently converts out-of-range u64 values to usize::MAX, which could bypass the MAX_CHUNK_SIZE check on 32-bit platforms. Return a ChunkTooLarge error immediately if try_from fails instead.
| let data_size_usize = usize::try_from(request.data_size).unwrap_or(usize::MAX); | |
| let data_size_usize = match usize::try_from(request.data_size) { | |
| Ok(size) => size, | |
| Err(_) => { | |
| // If the u64 value cannot be represented as usize (e.g. on 32-bit), | |
| // treat it as exceeding the maximum allowed chunk size. | |
| return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge { | |
| size: MAX_CHUNK_SIZE.saturating_add(1), | |
| max_size: MAX_CHUNK_SIZE, | |
| }); | |
| } | |
| }; |
| Ok(None) => { | ||
| debug!("Chunk {} not found on saorsa network", hex::encode(address)); | ||
| Ok(None) | ||
| let target_peer = Self::pick_target_peer(node).await?; |
There was a problem hiding this comment.
The pick_target_peer helper always selects the first connected peer via .next(), which is deterministic rather than random. This contradicts the method name's implication. Either rename to pick_first_peer or implement actual random selection if distribution is important.
…structor The same log message was emitted in both AntProtocol::new() and NodeBuilder::build_ant_protocol(). Keep it in the caller where it belongs and remove the side effect from the constructor. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
bincode v1's default deserialize has no size limit — a malicious peer can send a crafted length prefix causing unbounded memory allocation. Switch encode/decode to use DefaultOptions::new().with_limit() capped at MAX_CHUNK_SIZE + 1 MB (headroom for the envelope fields). This rejects oversized messages at the deserialization layer before any allocation occurs. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Replace 4 duplicated subscribe-send-poll event loops (~40-50 lines each) in quantum.rs and testnet.rs with a single generic send_and_await_chunk_response helper function. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
Clippy error from the |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 19 out of 19 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| timeout_secs: DEFAULT_TIMEOUT_SECS, | ||
| replica_count: DEFAULT_REPLICA_COUNT, |
There was a problem hiding this comment.
The replica_count field is initialized but never used in the implementation. Consider removing it or documenting when it will be implemented to avoid confusion.
| @@ -24,11 +24,10 @@ path = "src/bin/keygen.rs" | |||
|
|
|||
| [dependencies] | |||
| # Core (provides EVERYTHING: networking, DHT, security, trust, storage) | |||
There was a problem hiding this comment.
The PR description states that saorsa-labs/saorsa-core#11 must be merged before this PR can land, but this dependency version (0.10.2) appears to already be set. Consider adding a comment explaining the version requirement or verifying this is the correct version.
| # Core (provides EVERYTHING: networking, DHT, security, trust, storage) | |
| # Core (provides EVERYTHING: networking, DHT, security, trust, storage) | |
| # NOTE: saorsa-core 0.10.2 includes changes from saorsa-labs/saorsa-core#11 required by this crate. |
|
Pushed a follow-up with three fixes:\n- now verifies that the returned address matches the request and that the payload hashes to that address before handing it back to callers.\n- keeps an accurate count (with a startup scan) and enforces against the live count so deleting data frees capacity again.\n- The shared chunk protocol helper treats as a recoverable condition so the client can keep waiting instead of bailing out early on busy nodes.\n\nTests: test result: ok. 5 passed; 0 failed; 0 ignored; 0 measured; 130 filtered out; finished in 0.00s running 0 tests test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s running 0 tests test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s running 0 tests test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 88 filtered out; finished in 0.00s and test result: ok. 11 passed; 0 failed; 0 ignored; 0 measured; 124 filtered out; finished in 0.01s running 0 tests test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s running 0 tests test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s running 0 tests test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 88 filtered out; finished in 0.00s. |
|
Pushed a follow-up with three fixes:
Tests: |
|
@greptile please review @claude you too |
|
Claude finished @dirvine's task —— View job Code Review Complete ✅I've conducted a thorough review of this PR implementing the chunk protocol, storage handler, and cross-node e2e tests. Overall, this is high-quality work with good architecture and comprehensive testing. Below are my findings: ✅ StrengthsArchitecture & Design
Code Quality
|
Greptile OverviewGreptile SummaryThis PR introduces a complete chunk storage protocol with cross-node messaging, disk persistence, and comprehensive e2e testing infrastructure. Key Changes
Code Quality
Confidence Score: 4/5
|
| Filename | Overview |
|---|---|
| src/ant_protocol/chunk.rs | New chunk protocol with bincode serialization, proper size limits, and comprehensive test coverage |
| src/storage/handler.rs | ANT protocol handler with payment verification and disk storage; fallback to usize::MAX in quote handler could mask u64 overflow |
| src/storage/disk.rs | Content-addressed storage with atomic writes, two-level sharding, and comprehensive validation |
| src/node.rs | Protocol routing integration spawns background task to handle P2P messages and dispatch to AntProtocol |
| tests/e2e/integration_tests.rs | Comprehensive e2e tests for node-to-node messaging and QuantumClient chunk operations with clean teardown |
Sequence Diagram
sequenceDiagram
participant Client as Node 3 (Client)
participant P2P as P2P Layer
participant Handler as AntProtocol Handler
participant Payment as PaymentVerifier
participant Storage as DiskStorage
participant Peer as Target Peer Node
Note over Client,Peer: Chunk PUT Operation
Client->>Client: compute_address(content)
Client->>Client: ChunkPutRequest.encode()
Client->>P2P: send_message(peer, CHUNK_PROTOCOL_ID, bytes)
P2P->>Peer: transmit over QUIC
Peer->>Handler: P2PEvent::Message received
Handler->>Handler: ChunkMessage.decode()
Handler->>Handler: validate chunk size ≤ 4MB
Handler->>Handler: verify SHA256(content) == address
Handler->>Storage: exists(address)?
alt Chunk already exists
Storage-->>Handler: true
Handler->>Handler: ChunkPutResponse::AlreadyExists
else New chunk
Storage-->>Handler: false
Handler->>Payment: verify_payment(address, proof)
Payment-->>Handler: PaymentStatus::can_store()
Handler->>Storage: put(address, content)
Storage->>Storage: atomic write (temp + rename)
Storage-->>Handler: Ok(true)
Handler->>Handler: ChunkPutResponse::Success
end
Handler->>Handler: ChunkMessage.encode(response)
Handler->>P2P: send_response to source peer
P2P->>Client: ChunkPutResponse delivered
Note over Client,Peer: Chunk GET Operation
Client->>Client: ChunkGetRequest.encode()
Client->>P2P: send_message(peer, CHUNK_PROTOCOL_ID, bytes)
P2P->>Peer: transmit over QUIC
Peer->>Handler: P2PEvent::Message received
Handler->>Handler: ChunkMessage.decode()
Handler->>Storage: get(address)
Storage->>Storage: read from disk + verify hash
Storage-->>Handler: Ok(Some(content))
Handler->>Handler: ChunkGetResponse::Success
Handler->>Handler: ChunkMessage.encode(response)
Handler->>P2P: send_response to source peer
P2P->>Client: ChunkGetResponse with content
| ); | ||
|
|
||
| // Validate data size - data_size is u64, cast carefully | ||
| let data_size_usize = usize::try_from(request.data_size).unwrap_or(usize::MAX); |
There was a problem hiding this comment.
falling back to usize::MAX on u64 overflow could mask invalid requests on 32-bit systems
| let data_size_usize = usize::try_from(request.data_size).unwrap_or(usize::MAX); | |
| let data_size_usize = match usize::try_from(request.data_size) { | |
| Ok(size) => size, | |
| Err(_) => { | |
| return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge { | |
| size: MAX_CHUNK_SIZE + 1, | |
| max_size: MAX_CHUNK_SIZE, | |
| }); | |
| } | |
| }; |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/storage/handler.rs
Line: 231:231
Comment:
falling back to `usize::MAX` on u64 overflow could mask invalid requests on 32-bit systems
```suggestion
let data_size_usize = match usize::try_from(request.data_size) {
Ok(size) => size,
Err(_) => {
return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
size: MAX_CHUNK_SIZE + 1,
max_size: MAX_CHUNK_SIZE,
});
}
};
```
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
The issue is that falling back to usize::MAX masks potential overflow errors. Instead, we should return an error when the data size exceeds what can be represented on the target platform.
Here's the fix:
| let data_size_usize = usize::try_from(request.data_size).unwrap_or(usize::MAX); | |
| // Validate data size - reject if too large for platform | |
| let data_size_usize = match usize::try_from(request.data_size) { | |
| Ok(size) => size, | |
| Err(_) => { | |
| return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge { | |
| size: request.data_size as usize, // Safe for error reporting | |
| max_size: usize::MAX, | |
| }); | |
| } | |
| }; |
This change:
- Explicitly handles the conversion failure instead of silently masking it
- Returns a proper error when the requested data size is too large for the platform
- Uses the existing
ChunkTooLargeerror variant which is semantically appropriate - Avoids potential issues on 32-bit systems where
u64::MAXcannot fit inusize
|
Addressed the outstanding review feedback:
Tests: |
dirvine
left a comment
There was a problem hiding this comment.
Excellent @mickvandijke I am happy if you are?
|
Yes great fixes @dirvine 👍 |
Add unit and e2e tests covering the remaining Section 18 scenarios: Unit tests (32 new): - Quorum: #4 fail→abandoned, #16 timeout→inconclusive, #27 single-round dual-evidence, #28 dynamic threshold undersized, #33 batched per-key, #34 partial response unresolved, #42 quorum-derived paid-list auth - Admission: #5 unauthorized peer, #7 out-of-range rejected - Config: #18 invalid config rejected, #26 dynamic paid threshold - Scheduling: #8 dedup safety, #8 replica/paid collapse - Neighbor sync: #35 round-robin cooldown skip, #36 cycle completion, #38 snapshot stability mid-join, #39 unreachable removal + slot fill, #40 cooldown peer removed, #41 cycle termination guarantee, consecutive rounds, cycle preserves sync times - Pruning: #50 hysteresis prevents premature delete, #51 timestamp reset on heal, #52 paid/record timestamps independent, #23 entry removal - Audit: #19/#53 partial failure mixed responsibility, #54 all pass, #55 empty failure discard, #56 repair opportunity filter, response count validation, digest uses full record bytes - Types: #13 bootstrap drain, repair opportunity edge cases, terminal state variants - Bootstrap claims: #46 first-seen recorded, #49 cleared on normal E2e tests (4 new): - #2 fresh offer with empty PoP rejected - #5/#37 neighbor sync request returns response - #11 audit challenge multi-key (present + absent) - Fetch not-found for non-existent key Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Add unit and e2e tests covering the remaining Section 18 scenarios: Unit tests (32 new): - Quorum: #4 fail→abandoned, #16 timeout→inconclusive, #27 single-round dual-evidence, #28 dynamic threshold undersized, #33 batched per-key, #34 partial response unresolved, #42 quorum-derived paid-list auth - Admission: #5 unauthorized peer, #7 out-of-range rejected - Config: #18 invalid config rejected, #26 dynamic paid threshold - Scheduling: #8 dedup safety, #8 replica/paid collapse - Neighbor sync: #35 round-robin cooldown skip, #36 cycle completion, #38 snapshot stability mid-join, #39 unreachable removal + slot fill, #40 cooldown peer removed, #41 cycle termination guarantee, consecutive rounds, cycle preserves sync times - Pruning: #50 hysteresis prevents premature delete, #51 timestamp reset on heal, #52 paid/record timestamps independent, #23 entry removal - Audit: #19/#53 partial failure mixed responsibility, #54 all pass, #55 empty failure discard, #56 repair opportunity filter, response count validation, digest uses full record bytes - Types: #13 bootstrap drain, repair opportunity edge cases, terminal state variants - Bootstrap claims: #46 first-seen recorded, #49 cleared on normal E2e tests (4 new): - #2 fresh offer with empty PoP rejected - #5/#37 neighbor sync request returns response - #11 audit challenge multi-key (present + absent) - Fetch not-found for non-existent key Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
ant_protocol::chunk): bincode-serialized wire protocol for PUT/GET/Quote operations with content-addressed chunks (SHA256, max 4MB)storage::disk): content-addressed persistence with two-level sharded directories and atomic writesstorage::handler):AntProtocolhandler that routes incoming chunk messages through payment verification and disk storageRunningNode: subscribes to P2P events and dispatches chunk protocol messages toAntProtocol, sending responses back over P2PChunkPutRequest/ChunkGetRequestto a random connected peer and verifies the round-tripsend_message/subscribe_eventslayer with proper timeout handling and clean teardownXorName, stale doc correctionsDependencies
Test plan
cargo test— 174 tests pass (132 lib + 41 e2e + 1 doctest), 0 failurescargo test test_node_to_node_messaging -- --ignored— passes in ~26s with clean teardowncargo clippy --all-targets --all-features -- -D warnings— cleancargo fmt --all -- --check— clean🤖 Generated with Claude Code